package com.k2de.consumer.impl;

import com.k2data.k2de.utils.KMXFactory;
import com.k2data.platform.ddm.sdk.builder.KMXRecord;
import com.k2data.platform.ddm.sdk.client.KMXClient;
import com.k2data.platform.ddm.sdk.exception.NullRecordException;
import com.k2data.platform.ddm.sdk.exception.SendException;
import com.k2de.consumer.inter.KafkaSenderInter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 发往KMX正常区的kafkaProducer.
 *
 * Created by chenjingshuai on 18-1-9.
 */
public class KMXSenderImpl implements KafkaSenderInter<KMXRecord> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KMXSenderImpl.class);

    // 正常topic的kafka client.
    private KMXClient kmxClient;

    @Override
    public void init(String kafkaServerUrl) {
        this.kmxClient = KMXFactory.createKmxClient(kafkaServerUrl);
    }

    @Override
    public void send(KMXRecord sendObj) {
        try {
            LOGGER.debug("Send KmxRecord to kmx.", sendObj);

            kmxClient.send(sendObj);

            LOGGER.debug("The process of sending kmxRecord success.");
        } catch (SendException | NullRecordException e) {
            LOGGER.warn("Send KmxRecord occurs exception:{}.", e.getMessage());
        }
    }
}
